A simple communication infrastructure providing typed exchange channels.
This crate is part of the timely dataflow system, used primarily for its inter-worker communication. It may be indepedently useful, but it is separated out mostly to make clear boundaries in the project.
Threads are spawned with an allocator::Generic
, whose allocate
method returns a pair of several send endpoints and one
receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
To be communicated, a type must implement the Serialize
trait. A default implementation of Serialize
is
provided for any type implementing Abomonation
. To implement other serialization strategies, wrap your type
and implement Serialize
for your wrapper.
Channel endpoints also implement a lower-level push
and pull
interface (through the Push
and Pull
traits), which is used for more precise control of resources.
#Examples
// configure for two threads, just one process.
let config = timely_communication::Configuration::Process(2);
// create a source of inactive loggers.
let logger = ::std::sync::Arc::new(|_| timely_communication::logging::BufferingLogger::new_inactive());
// initializes communication, spawns workers
let guards = timely_communication::initialize(config, logger, |mut allocator| {
println!("worker {} started", allocator.index());
// allocates pair of senders list and one receiver.
let (mut senders, mut receiver, _) = allocator.allocate();
// send typed data along each channel
senders[0].send(format!("hello, {}", 0));
senders[1].send(format!("hello, {}", 1));
// no support for termination notification,
// we have to count down ourselves.
let mut expecting = 2;
while expecting > 0 {
if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message);
expecting -= 1;
}
}
// optionally, return something
allocator.index()
});
// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
The should produce output like:
worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)